Skip to content

Commit

Permalink
updated redis ur port = 6380
Browse files Browse the repository at this point in the history
fixed update in mysql
added logger=none for docker run (sources synchronization)
  • Loading branch information
Sergey Burykin committed Dec 13, 2021
1 parent 821861f commit 626abfd
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 15 deletions.
6 changes: 3 additions & 3 deletions compose-data/redis_users_recognition/redis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
# IF YOU ARE SURE YOU WANT YOUR INSTANCE TO LISTEN TO ALL THE INTERFACES
# JUST COMMENT OUT THE FOLLOWING LINE.
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
bind 127.0.0.1 -::1
#bind 127.0.0.1 -::1

# Protected mode is a layer of security protection, in order to avoid that
# Redis instances left open on the internet are accessed and exploited.
Expand All @@ -91,11 +91,11 @@ bind 127.0.0.1 -::1
# you are sure you want clients from other hosts to connect to Redis
# even if no authentication is configured, nor a specific set of interfaces
# are explicitly listed using the "bind" directive.
protected-mode yes
protected-mode no

# Accept connections on the specified port, default is 6379 (IANA #815344).
# If port 0 is specified Redis will not listen on a TCP socket.
port 6379
port 6380

# TCP listen() backlog.
#
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ services:
- REDIS_URL=redis://redis:6379
#Retroactive users recognition can affect RAM significant. Read more about the solution https://jitsu.com/docs/other-features/retroactive-user-recognition
- USER_RECOGNITION_ENABLED=true
- USER_RECOGNITION_REDIS_URL=redis://redis_users_recognition:6379
- USER_RECOGNITION_REDIS_URL=redis://redis_users_recognition:6380
- TERM=xterm-256color
depends_on:
- redis
Expand Down
24 changes: 24 additions & 0 deletions server/adapters/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
mySQLCreateDBIfNotExistsTemplate = "CREATE DATABASE IF NOT EXISTS `%s`"
mySQLCreateTableTemplate = "CREATE TABLE `%s`.`%s` (%s)"
mySQLInsertTemplate = "INSERT INTO `%s`.`%s` (%s) VALUES %s"
mySQLUpdateTemplate = "UPDATE `%s`.`%s` SET %s WHERE %s=?"
mySQLAlterPrimaryKeyTemplate = "ALTER TABLE `%s`.`%s` ADD CONSTRAINT %s PRIMARY KEY (%s)"
mySQLMergeTemplate = "INSERT INTO `%s`.`%s` (%s) VALUES %s ON DUPLICATE KEY UPDATE %s"
mySQLBulkMergeTemplate = "INSERT INTO `%s`.`%s` (%s) SELECT * FROM (SELECT %s FROM `%s`.`%s`) AS tmp ON DUPLICATE KEY UPDATE %s"
Expand Down Expand Up @@ -218,6 +219,29 @@ func (m *MySQL) BulkUpdate(table *Table, objects []map[string]interface{}, delet
return wrappedTx.DirectCommit()
}

//Update one record in MySQL
func (m *MySQL) Update(table *Table, object map[string]interface{}, whereKey string, whereValue interface{}) error {
columns := make([]string, len(object), len(object))
values := make([]interface{}, len(object)+1, len(object)+1)
i := 0
for name, value := range object {
columns[i] = m.quote(name) + "= ?"
values[i] = value
i++
}
values[i] = whereValue

statement := fmt.Sprintf(mySQLUpdateTemplate, m.config.Db, table.Name, strings.Join(columns, ", "), whereKey)
m.queryLogger.LogQueryWithValues(statement, values)
_, err := m.dataSource.ExecContext(m.ctx, statement, values...)
if err != nil {
err = checkErr(err)
return fmt.Errorf("Error updating %s table with statement: %s values: %v: %v", table.Name, statement, values, err)
}

return nil
}

//DropTable drops table in transaction
func (m *MySQL) DropTable(table *Table) error {
wrappedTx, err := m.OpenTx()
Expand Down
10 changes: 5 additions & 5 deletions server/airbyte/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (r *Runner) Spec() (interface{}, error) {
resultParser := &synchronousParser{desiredRowType: SpecType}
errWriter := logging.NewStringWriter()

err := r.run(resultParser.parse, copyTo(errWriter), time.Minute*3, "run", "--rm", "-i", "--name", r.identifier, fmt.Sprintf("%s:%s", Instance.AddAirbytePrefix(r.DockerImage), r.Version), "spec")
err := r.run(resultParser.parse, copyTo(errWriter), time.Minute*3, "run", "--rm", "--init", "-i", "--name", r.identifier, "--log-driver", "none", fmt.Sprintf("%s:%s", Instance.AddAirbytePrefix(r.DockerImage), r.Version), "spec")
if err != nil {
if err == runner.ErrNotReady {
return nil, err
Expand Down Expand Up @@ -93,8 +93,8 @@ func (r *Runner) Check(airbyteSourceConfig interface{}) error {
}
}()

err = r.run(resultParser.parse, copyTo(errWriter), time.Minute * 3,
"run", "--rm", "-i", "--name", r.identifier, "-v", fmt.Sprintf("%s:%s", Instance.WorkspaceVolume, VolumeAlias), fmt.Sprintf("%s:%s", Instance.AddAirbytePrefix(r.DockerImage), r.Version), "check", "--config", path.Join(VolumeAlias, relatedFilePath))
err = r.run(resultParser.parse, copyTo(errWriter), time.Minute*3,
"run", "--rm", "--init", "-i", "--name", r.identifier, "--log-driver", "none", "-v", fmt.Sprintf("%s:%s", Instance.WorkspaceVolume, VolumeAlias), fmt.Sprintf("%s:%s", Instance.AddAirbytePrefix(r.DockerImage), r.Version), "check", "--config", path.Join(VolumeAlias, relatedFilePath))
if err != nil {
if err == runner.ErrNotReady {
return err
Expand Down Expand Up @@ -132,7 +132,7 @@ func (r *Runner) Discover(airbyteSourceConfig interface{}, timeout time.Duration
}()

err = r.run(resultParser.parse, copyTo(dualStdErrWriter), timeout,
"run", "--rm", "-i", "--name", r.identifier, "-v", fmt.Sprintf("%s:%s", Instance.WorkspaceVolume, VolumeAlias), fmt.Sprintf("%s:%s", Instance.AddAirbytePrefix(r.DockerImage), r.Version), "discover", "--config", path.Join(VolumeAlias, relatedFilePath))
"run", "--rm", "--init", "-i", "--name", r.identifier, "--log-driver", "none", "-v", fmt.Sprintf("%s:%s", Instance.WorkspaceVolume, VolumeAlias), fmt.Sprintf("%s:%s", Instance.AddAirbytePrefix(r.DockerImage), r.Version), "discover", "--config", path.Join(VolumeAlias, relatedFilePath))
if err != nil {
if err == runner.ErrNotReady {
return nil, err
Expand Down Expand Up @@ -182,7 +182,7 @@ func (r *Runner) Read(dataConsumer base.CLIDataConsumer, streamsRepresentation m

dualStdErrWriter := logging.Dual{FileWriter: taskLogger, Stdout: logging.NewPrefixDateTimeProxy(fmt.Sprintf("[%s]", sourceID), Instance.LogWriter)}

args := []string{"run", "--rm", "-i", "--name", taskCloser.TaskID(), "-v", fmt.Sprintf("%s:%s", Instance.WorkspaceVolume, VolumeAlias), fmt.Sprintf("%s:%s", Instance.AddAirbytePrefix(r.DockerImage), r.Version), "read", "--config", path.Join(VolumeAlias, sourceID, r.DockerImage, base.ConfigFileName), "--catalog", path.Join(VolumeAlias, sourceID, r.DockerImage, base.CatalogFileName)}
args := []string{"run", "--rm", "--init", "-i", "--name", taskCloser.TaskID(), "--log-driver", "none", "-v", fmt.Sprintf("%s:%s", Instance.WorkspaceVolume, VolumeAlias), fmt.Sprintf("%s:%s", Instance.AddAirbytePrefix(r.DockerImage), r.Version), "read", "--config", path.Join(VolumeAlias, sourceID, r.DockerImage, base.ConfigFileName), "--catalog", path.Join(VolumeAlias, sourceID, r.DockerImage, base.CatalogFileName)}

if statePath != "" {
args = append(args, "--state", path.Join(VolumeAlias, sourceID, r.DockerImage, base.StateFileName))
Expand Down
3 changes: 0 additions & 3 deletions server/caching/events_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,6 @@ func (ec *EventsCache) put(destinationID, eventID string, value events.Event) {
//delete old if overflow
if eventsInCache > ec.capacityPerDestination {
toDelete := eventsInCache - ec.capacityPerDestination
if toDelete > 2 {
logging.Debugf("[%s] Events cache size: [%d] capacity: [%d] elements to delete: [%d]", destinationID, eventsInCache, ec.capacityPerDestination, toDelete)
}
for i := 0; i < toDelete; i++ {
err := ec.storage.RemoveLastEvent(destinationID)
if err != nil {
Expand Down
29 changes: 27 additions & 2 deletions server/storages/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,34 @@ func (m *MySQL) Clean(tableName string) error {
return cleanImpl(m, tableName)
}

//Update uses SyncStore under the hood
//Update updates record in MySQL
func (m *MySQL) Update(objects []map[string]interface{}) error {
return m.SyncStore(nil, objects, "", true)
_, tableHelper := m.getAdapters()
for _, object := range objects {
envelops, err := m.processor.ProcessEvent(object)
if err != nil {
return err
}

for _, envelop := range envelops {
batchHeader := envelop.Header
processedObject := envelop.Event
table := tableHelper.MapTableSchema(batchHeader)

dbSchema, err := tableHelper.EnsureTableWithCaching(m.ID(), table)
if err != nil {
return err
}

start := timestamp.Now()
if err = m.adapter.Update(dbSchema, processedObject, m.uniqueIDField.GetFlatFieldName(), m.uniqueIDField.Extract(object)); err != nil {
return err
}
logging.Debugf("[%s] Updated 1 row in [%.2f] seconds", m.ID(), timestamp.Now().Sub(start).Seconds())
}
}

return nil
}

//GetUsersRecognition returns users recognition configuration
Expand Down
2 changes: 1 addition & 1 deletion server/storages/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (p *Postgres) Clean(tableName string) error {
return cleanImpl(p, tableName)
}

//Update uses SyncStore under the hood
//Update updates record in Postgres
func (p *Postgres) Update(objects []map[string]interface{}) error {
_, tableHelper := p.getAdapters()
for _, object := range objects {
Expand Down

0 comments on commit 626abfd

Please sign in to comment.